Passed
Push — master ( 8441a2...76cae4 )
by Michael
02:05
created

WebSocketClient.init_flush   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 4
dl 0
loc 4
rs 10
c 0
b 0
f 0
cc 1
1
import './types'
2
import { Zipnum } from 'zipnum'
3
import { add_event, sett } from './utils'
4
import { processConfig } from './config'
5
import { AnyFunc, once, T } from 'pepka'
6
7
const MAX_32 = 2**31 - 1
8
const zipnum = new Zipnum()
9
10
type EventHandler<T extends keyof WebSocketEventMap> = AnyFunc<any, [WebSocketEventMap[T]]>
11
type EventHandlers = {
12
  open: EventHandler<'open'>[]
13
  close: EventHandler<'close'>[]
14
  error: EventHandler<'error'>[]
15
  message: AnyFunc<any, [WebSocketEventMap['message'] & {data: any}]>[]
16
  timeout: AnyFunc<any, [data: any]>[]
17
}
18
19
class WebSocketClient {
20
  private open = false
21
  private ws: wsc.Socket|null = null
22
  private forcibly_closed = false
23
  private reconnect_timeout: NodeJS.Timeout|null = null
24
  private queue = {}
25
  private messages: any[] = []
26
  private onReadyQueue: AnyFunc[] = []
27
  private onCloseQueue: AnyFunc[] = []
28
  private handlers: EventHandlers = { open: [], close: [], message: [], error: [], timeout: [] }
29
  private config = <wsc.Config>{}
30
31
  private init_flush(): void {
32
    this.queue    = {}  // data queuse
33
    this.messages = []  // send() queue
34
  }
35
  private call(event_name: wsc.WSEvent, ...args: any[]) {
36
    // this.handlers.open[0]()
37
    for(const h of this.handlers[event_name]) h(...args)
38
  }
39
40
  private log(event: string, message: any = null, time: number|null = null): void {
41
    const config = this.config
42
    if(time !== null) {
43
      config.log(event, time, message)
44
    } else {
45
      if(config.timer) {
46
        config.log(event, null, message)
47
      } else {
48
        config.log(event, message)
49
      }
50
    }
51
  }
52
53
  private initSocket(ws: wsc.Socket) {
54
    const config = this.config
55
    this.open = true
56
    this.onReadyQueue.forEach((fn: Function) => fn())
57
    this.onReadyQueue.splice(0)
58
    const {id_key, data_key} = config.server
59
    // Works also on previously opened sockets that do not fire 'open' event.
60
    this.call('open', ws)
61
    // Send all pending messages.
62
    this.messages.forEach((message: any) => message.send())
63
    // It's reconnecting.
64
    if(this.reconnect_timeout !== null) {
65
      clearInterval(this.reconnect_timeout)
66
      this.reconnect_timeout = null
67
    }
68
    if(config.ping) {
69
      const ping_interval = setInterval(() => {
70
        if(this.open) this.send(config.ping.content)
71
        if(this.forcibly_closed) clearInterval(ping_interval)
72
      }, config.ping.interval*1e3)
73
    }
74
    add_event(ws, 'close', async (...e) => {
75
      this.log('close')
76
      this.open = false
77
      this.onCloseQueue.forEach((fn: Function) => fn())
78
      this.onCloseQueue.splice(0)
79
      this.call('close', ...e)
80
      // Auto reconnect.
81
      const reconnect = config.reconnect
82
      if(
83
        typeof reconnect === 'number' &&
84
        !isNaN(reconnect) &&
85
        !this.forcibly_closed
86
      ) {
87
        const reconnectFunc = async () => {
88
          this.log('reconnect')
89
          if(this.ws !== null) {
90
            this.ws.close()
91
            this.ws = null
92
          }
93
          // If some error occured, try again.
94
          const status = await this.connect()
95
          if(status !== null)
96
            this.reconnect_timeout = setTimeout(reconnectFunc, reconnect * 1000)
97
        }
98
        // TODO: test normal close by server. Would it be infinite ?
99
        reconnectFunc()
100
      } else {
101
        this.ws = null
102
        this.open = false
103
      }
104
      // reset the flag to reuse.
105
      this.forcibly_closed = false
106
    })
107
    add_event(ws, 'message', (e) => {
108
      try {
109
        const data = config.decode(e.data)
110
        this.call('message', {...e, data})
111
        if(data[id_key]) {
112
          const q = this.queue[data[id_key]]
113
          if(q) {
114
            // Debug, Log.
115
            const time = q.sent_time ? (Date.now() - q.sent_time) : null
116
            this.log('message', data[data_key], time)
117
            // Play.
118
            q.ff(data[data_key])
119
            clearTimeout(q.timeout)
120
            delete this.queue[data[id_key]]
121
          }
122
        }
123
      } catch (err) {
124
        console.error(err, `WSP: Decode error. Got: ${e.data}`)
125
      }
126
    })
127
  }
128
129
  private connect() { // returns status if won't open or null if ok.
130
    return new Promise((ff) => {
131
      if(this.open === true) {
132
        return ff(null)
133
      }
134
      const config = this.config
135
      const ws = config.socket || config.adapter(config.url, config.protocols)
136
      this.ws = ws
137
      if(!ws || ws.readyState > 1) {
138
        this.ws = null
139
        this.log('error', 'ready() on closing or closed state! status 2.')
140
        return ff(2)
141
      }
142
      const ffo = once(ff)
143
      add_event(ws, 'error', once((e) => {
144
        this.log('error', 'status 3.')
145
        this.call('error', e)
146
        this.ws = null
147
        // Some network error: Connection refused or so.
148
        ffo(3)
149
      }))
150
      // Because 'open' won't be envoked on opened socket.
151
      if(ws.readyState) {
152
        this.initSocket(ws)
153
        ffo(null)
154
      } else {
155
        add_event(ws, 'open', once(() => {
156
          this.log('open')
157
          this.initSocket(ws)
158
          ffo(null)
159
        }))
160
      }
161
    })
162
  }
163
  public get socket() { return this.ws }
164
  public async ready() {
165
    return new Promise<void>((ff) => {
166
      if(this.open) {
167
        ff()
168
      } else {
169
        this.onReadyQueue.push(ff)
170
      }
171
    })
172
  }
173
  public on(
174
    event_name: wsc.WSEvent,
175
    handler: (data: any) => any,
176
    predicate: (data: any) => boolean = T,
177
    raw = false
178
  ) {
179
    const _handler: wsc.EventHandler = (event) =>
180
      predicate(event) && handler(event)
181
    return raw
182
      ? add_event(this.ws as wsc.Socket, event_name, _handler)
183
      : this.handlers[event_name].push(_handler)
184
  }
185
186
  public async close(): wsc.AsyncErrCode {
187
    return new Promise((ff, rj) => {
188
      if(this.ws === null) {
189
        rj('WSP: closing a non-inited socket!')
190
      } else {
191
        this.open = false
192
        this.onCloseQueue.push(() => {
193
          this.init_flush()
194
          this.ws = null
195
          this.forcibly_closed = true
196
          ff(null)
197
        })
198
        this.ws.close()
199
      }
200
    })
201
  }
202
203
  /**  .send(your_data) wraps request to server with {id: `hash`, data: `actually your data`},
204
    returns a Promise that will be rejected after a timeout or
205
    resolved if server returns the same signature: {id: `same_hash`, data: `response data`}.
206
  */
207
  public async send<RequestDataType = any, ResponseDataType = any>(
208
    message_data: RequestDataType,
209
    opts = <wsc.SendOptions>{}
210
  ): Promise<ResponseDataType> {
211
    this.log('send', message_data)
212
    const config   = this.config
213
    const message  = {}
214
    const data_key = config.server.data_key
215
    const first_time_lazy = config.lazy && !this.open
216
217
    const message_id = zipnum.zip((Math.random()*(MAX_32-10))|0)
218
    if(typeof opts.top === 'object') {
219
      if(opts.top[data_key]) {
220
        throw new Error('Attempting to set data key/token via send() options!')
221
      }
222
      Object.assign(message, opts.top)
223
    }
224
    config.pipes.forEach((pipe) => message_data = pipe(message_data))
225
226
    if(this.open === true) {
227
      (this.ws as wsc.Socket).send(config.encode(message_id, message_data, config))
228
    } else if(this.open === false || first_time_lazy) {
229
      this.messages.push({
230
        send: () => (this.ws as wsc.Socket).send(config.encode(message_id, message_data, config))
231
      })
232
      if(first_time_lazy) this.connect()
233
    } else if(this.open === null) {
234
      throw new Error('Attempting to send via closed WebSocket connection!')
235
    }
236
237
    return new Promise((ff, rj) => {
238
      // TODO: Make it class Message.
239
      this.queue[message_id] = {
240
        ff,
241
        data_type: config.data_type,
242
        sent_time: config.timer ? Date.now() : null,
243
        timeout: sett(config.timeout, () => {
244
          if(this.queue[message_id]) {
245
            this.call('timeout', message_data)
246
            rj({
247
              'Websocket timeout expired: ': config.timeout,
248
              'for the message ': message_data
249
            })
250
            delete this.queue[message_id]
251
          }
252
        })
253
      }
254
    })
255
  }
256
257
  // TODO: Add .on handlers to config!
258
  constructor(user_config: wsc.UserConfig = {}) {
259
    this.config = processConfig(user_config)
260
    this.init_flush()
261
    if(!this.config.lazy) this.connect()
262
  }
263
}
264
265
/* TODO: v3: @.deprecated. Use named import { WebSocketClient } instead. */
266
export default WebSocketClient